#include "config.h"
#include "configure.h"
#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"
#include "lich_qos.h"
#include "recovery.h"
#include "volume_proto_readcache.h"
#include "row3_volume_proto_io.h"

/** IO Interfaces.
 *
 * Two formats:
 * + RAW
 * + LSV (Log structured Volume)
 *
 */

#ifdef USE_ROW2222222222222222222222222222
int volume_proto_write(volume_proto_t *volume_proto, const io_t *io, const buffer_t *buf)
{
        int ret;
        chkinfo_t *chkinfo;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        YASSERT(io->offset >= 0 && io->size >= 0);
        YASSERT(io->size == buf->len);

        fileinfo_t *fileinfo = &volume_proto->table1.fileinfo;
        volume_format_t format = lich_volume_format(fileinfo);

        ret = volume_proto_check_ready(volume_proto);
        if (unlikely(ret)) {
                if (ret == ENOENT || ret == EBUSY) {
                        ret = EAGAIN;
                }

                GOTO(err_ret, ret);
        }

#ifdef RECOVERY_IO_QOS
        ret = recovery_qos_apply(volume_proto->table1.pool);
        if (ret) {
                GOTO(err_ret, ret);
        }
#endif
        ret = volume_proto_throt_request(volume_proto, THROT_WRITE, io->size);
        if (ret) {
                GOTO(err_ret, ret);
        }

        DBUG("ino %ju format %d off %ju size %u\n", io->id.id, format, io->offset, io->size);
        YASSERT(io->size == buf->len);

        lsv_info->row2_stat.write++;

        if (format == VOLUME_FORMAT_LSV) {
                ret = lsv_volume_proto_write(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

        } else if (format == VOLUME_FORMAT_ROW2) {
                ret = row2_volume_proto_write(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                io_opt_t io_opt;
                io_opt_init(&io_opt, 0, 0, 0, 0);
                ret = volume_proto_write_raw(volume_proto, &io_opt, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (readcache_ready()) {
                YASSERT(buf->len == io->size);
                chkinfo = volume_proto->table1.table_proto->chkinfo;
                ret = readcache_save(io, chkinfo->info_version, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

#if 0
        ret = readcache_check(io, chkinfo->info_version, buf);
        if (unlikely(ret)) {
        }
#endif

        volume_proto_info(volume_proto);
        return 0;
err_ret:
        return ret;
}

/**
 * io split
 *
 * RAW ->   head |    maximum  |  tail
 *           |            |        |
 *          \/           \/       \/
 *         cache     need_load   cache
 */
int volume_proto_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)
{
        int ret;
        io_t need_load;
        buffer_t head, maximum, tail;
        fileinfo_t *fileinfo = &volume_proto->table1.fileinfo;
        volume_format_t format = lich_volume_format(fileinfo);
        chkinfo_t *chkinfo = volume_proto->table1.table_proto->chkinfo;
        lsv_volume_proto_t *lsv_info = &volume_proto->table1.lsv_info;

        const io_t *io_in;
        buffer_t *buf_in;

        int enable_cache = readcache_ready();

        ret = volume_proto_check_ready(volume_proto);
        if (unlikely(ret)) {
                if (ret == ENOENT || ret == EBUSY) {
                        ret = EAGAIN;
                }
                GOTO(err_ret, ret);
        }

#ifdef RECOVERY_IO_QOS
        ret = recovery_qos_apply(volume_proto->table1.pool);
	if (ret) {
		GOTO(err_ret, ret);
	}
#endif

        ret = volume_proto_throt_request(volume_proto, THROT_READ, io->size);
        if (ret) {
                GOTO(err_ret, ret);
        }

        DBUG("ino %ju format %u off %ju size %u\n", io->id.id, format, io->offset, io->size);

        lsv_info->row2_stat.read++;

        if (unlikely(enable_cache)) {
                mbuffer_init(&head, 0);
                mbuffer_init(&maximum, 0);
                mbuffer_init(&tail, 0);

                ret = readcache_io_split(io, chkinfo->info_version, &head, &tail, &need_load);
                if (unlikely(ret)) {
                        DWARN("read cache io split fail!\n");
                        YASSERT(0);
                }

                DBUG("head %u load %u tail %u\n", head.len, need_load.size, tail.len);
                io_in = &need_load;
                buf_in = &maximum;
        } else {
                buf_in = buf;
                io_in = io;
        }

        if (io_in->size > 0) {
                // TODO io.flags?
                if (format == VOLUME_FORMAT_LSV) {
                        ret = lsv_volume_proto_read(volume_proto, io_in, buf_in);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else if (format == VOLUME_FORMAT_ROW2) {
                        ret = row2_volume_proto_read(volume_proto, io_in, buf_in);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        ret = volume_proto_read_raw(volume_proto, io_in, buf_in);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                if (unlikely(enable_cache)) {
                        ret = readcache_save(io_in, chkinfo->info_version, buf_in);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }
                }
#if 0
                YASSERT(need_load.size == maximum.len);
                uint32_t crc = mbuffer_crc(&maximum, 0, need_load.size);
                DINFO("load off %ju size %u crc %u\n", need_load.offset, need_load.size, crc);
#endif
        }

        if (enable_cache) {
                YASSERT(need_load.size == maximum.len);
                YASSERT(head.len + maximum.len + tail.len == io->size);
                YASSERT(buf->len == 0);

                mbuffer_pop(&head, buf, head.len);
                if (need_load.size > 0) {
                        mbuffer_pop(&maximum, buf, maximum.len);
                }
                mbuffer_pop(&tail, buf, tail.len);

                YASSERT(buf->len == io->size);

#if 0
                {
                        void *_buf;
                        ymalloc(&_buf, io->size);
                        mbuffer_get(buf, _buf, io->size);

                        uint64_t off = io->offset;
                        int left = io->size;
                        int idx = 0, step;

                        while (left > 0) {
                                step = _min(left, 4096 - off % 4096);

                                uint32_t crc = 0;
                                if (step == 4096) {
                                        crc = page_crc32(_buf + idx);
                                }

                                DINFO("io read lba %ju step %u crc %u\n", io->offset + idx, step, crc);

                                off += step;
                                idx += step;
                                left -= step;
                        }
                }
#endif

                mbuffer_free(&head);
                mbuffer_free(&maximum);
                mbuffer_free(&tail);
        }

        return 0;

err_ret:
        if (enable_cache) {
                mbuffer_free(&head);
                mbuffer_free(&maximum);
                mbuffer_free(&tail);
        }

        return ret;
}

#endif

int volume_proto_write(volume_proto_t *volume_proto, const io_t *io, const buffer_t *buf)
{
        int ret;
        fileinfo_t *fileinfo = &volume_proto->table1.fileinfo;
        volume_format_t format = lich_volume_format(fileinfo);

#if ENABLE_QOS
        ret = volume_proto_io_enter(volume_proto, io);
        if (unlikely(ret))
		GOTO(err_ret, ret);

#endif

#if ENABLE_QOS_VOL
        ret = volume_proto_throt_request(volume_proto, THROT_WRITE, io->size);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif

        // profile_t *profile = &volume_proto->profile_io_write;
        // PROFILE_BEGIN(profile, 1);
        if (unlikely(format == VOLUME_FORMAT_LSV)) {
                ret = lsv_volume_proto_write(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (unlikely(format == VOLUME_FORMAT_ROW2)) {
                ret = row2_volume_proto_write(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (unlikely(format == VOLUME_FORMAT_ROW3)) {
                ret = row3_volume_proto_write(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                io_opt_t io_opt;
                io_opt_init(&io_opt, 0, 0, 0, 0);
                ret = volume_proto_write_raw(volume_proto, &io_opt, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        // PROFILE_COLLECT(profile);

        volume_proto_io_exit(volume_proto);
        return 0;
err_ret:
        volume_proto_io_exit(volume_proto);
        return ret;
}

int IO_FUNC volume_proto_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)
{
        int ret;
        fileinfo_t *fileinfo = &volume_proto->table1.fileinfo;
        volume_format_t format = lich_volume_format(fileinfo);

#if ENABLE_QOS
        ret = volume_proto_io_enter(volume_proto, io);
        if (unlikely(ret))
		GOTO(err_ret, ret);
#endif

#if ENABLE_QOS_VOL
        ret = volume_proto_throt_request(volume_proto, THROT_READ, io->size);
        if (ret) {
                GOTO(err_ret, ret);
        }
#endif

        if (unlikely(format == VOLUME_FORMAT_LSV)) {
                ret = lsv_volume_proto_read(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (unlikely(format == VOLUME_FORMAT_ROW2)) {
                ret = row2_volume_proto_read(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (format == VOLUME_FORMAT_ROW3) {
                ret = row3_volume_proto_read(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = volume_proto_read_raw(volume_proto, io, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        volume_proto_io_exit(volume_proto);
        return 0;
err_ret:
        volume_proto_io_exit(volume_proto);
        return ret;
}

int volume_proto_unmap(volume_proto_t *volume_proto, const io_t *io)
{
        int ret;

        io_opt_t io_opt;
        io_opt_init(&io_opt, 0, 0, 0, 0);
        ret = volume_proto_unmap_raw(volume_proto, &io_opt, io);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
